/*
 * (C) 2007-2017 Alibaba Group Holding Limited
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 as
 * published by the Free Software Foundation.
 *
 * See the AUTHORS file for names of contributors.
 *
 */

#include <atomic.h>

#include "leveldb/filter_policy.h"
#include "leveldb/slice.h"
#include "util/hash.h"

#include "ldb_define.hpp"
#include "ldb_cache_stat.hpp"
#include "ldb_stat_manager.hpp"
#include "common/log.hpp"

namespace tair
{
  namespace storage 
  { 
    namespace ldb
    {
      struct LdbBloomStat {
        LdbBloomStat() {
          reset();
        }
        void reset() {
          atomic_set(&get_count_, 0);
          atomic_set(&miss_count_, 0);
        }
        int32_t get_count() {
          return atomic_read(&get_count_);
        }
        int32_t miss_count() {
          return atomic_read(&miss_count_);
        }
        void add_get_count() {
          atomic_inc(&get_count_);
        }
        void add_miss_count() {
          atomic_inc(&miss_count_);
        }

        atomic_t get_count_;
        // consider MISS is on minority situation, so we stat miss count.
        atomic_t miss_count_;
      };

      static uint32_t BloomHash(const leveldb::Slice& key) {
        return leveldb::Hash(key.data() + LDB_FILTER_SKIP_SIZE, key.size() - LDB_FILTER_SKIP_SIZE, 0xbc9f1d34);
      }

      class LdbBloomFilterPolicy : public leveldb::FilterPolicy {
      private:
        size_t bits_per_key_;
        size_t k_;
        LdbStatManager* stat_;
      public:
        // use static global stat
        static LdbBloomStat old_stat_[TAIR_MAX_AREA_COUNT];

      public:
        explicit LdbBloomFilterPolicy(int bits_per_key, LdbStatManager* stat)
          : bits_per_key_(bits_per_key), stat_(stat) {
          // We intentionally round down to reduce probing cost a little bit
          k_ = static_cast<size_t>(bits_per_key * 0.69);  // 0.69 =~ ln(2)
          if (k_ < 1) k_ = 1;
          if (k_ > 30) k_ = 30;
        }

        virtual const char* Name() const {
          return "ldb.LdbBloomFilter";
        }

        virtual void CreateFilter(const leveldb::Slice* keys, int n, std::string* dst) const {
          // Compute bloom filter size (in both bits and bytes)
          size_t bits = n * bits_per_key_;

          // For small n, we can see a very high false positive rate.  Fix it
          // by enforcing a minimum bloom filter length.
          if (bits < 64) bits = 64;

          size_t bytes = (bits + 7) / 8;
          bits = bytes * 8;

          const size_t init_size = dst->size();
          dst->resize(init_size + bytes, 0);
          dst->push_back(static_cast<char>(k_));  // Remember # of probes in filter
          char* array = &(*dst)[init_size];
          for (int32_t i = 0; i < n; i++) {
            // Use double-hashing to generate a sequence of hash values.
            // See analysis in [Kirsch,Mitzenmacher 2006].
            uint32_t h = BloomHash(keys[i]);
            const uint32_t delta = (h >> 17) | (h << 15);  // Rotate right 17 bits
            for (size_t j = 0; j < k_; j++) {
              const uint32_t bitpos = h % bits;
              array[bitpos/8] |= (1 << (bitpos % 8));
              h += delta;
            }
          }
        }

        virtual bool KeyMayMatch(const leveldb::Slice& key, const leveldb::Slice& bloom_filter) const {
          PROFILER_BEGIN("bloom");
          int area = LdbKey::decode_area_with_key(key.data());
          old_stat_[area].add_get_count();
          const size_t len = bloom_filter.size();
          if (len < 2) {
            old_stat_[area].add_miss_count();
            PROFILER_END();
            stat_->update_bf_get(area, TAIR_RETURN_FAILED);
            return false;
          }

          const char* array = bloom_filter.data();
          const size_t bits = (len - 1) * 8;

          // Use the encoded k so that we can read filters generated by
          // bloom filters created using different parameters.
          const size_t k = array[len-1];
          if (k > 30) {
            // Reserved for potentially new encodings for short bloom filters.
            // Consider it a match.
            PROFILER_END();
            stat_->update_bf_get(area, TAIR_RETURN_SUCCESS);
            return true;
          }

          uint32_t h = BloomHash(key);
          const uint32_t delta = (h >> 17) | (h << 15);  // Rotate right 17 bits
          for (size_t j = 0; j < k; j++) {
            const uint32_t bitpos = h % bits;
            if ((array[bitpos/8] & (1 << (bitpos % 8))) == 0) {
              old_stat_[area].add_miss_count();
              PROFILER_END();
              stat_->update_bf_get(area, TAIR_RETURN_FAILED);
              return false;
            }
            h += delta;
          }
          PROFILER_END();
          stat_->update_bf_get(area, TAIR_RETURN_SUCCESS);
          return true;
        }
      };

      LdbBloomStat LdbBloomFilterPolicy::old_stat_[TAIR_MAX_AREA_COUNT];

      void get_bloom_stats(cache_stat* ldb_cache_stat) {
        for (size_t i = 0; i < TAIR_MAX_AREA_COUNT; ++i) {
          // NOTE: use current cache stat temporarily. ugly..
          // TODO: ..
          if (LdbBloomFilterPolicy::old_stat_[i].get_count() > 0) {
            SET_LDB_STAT_BLOOM_GET_COUNT(&ldb_cache_stat[i], LdbBloomFilterPolicy::old_stat_[i].get_count());
            SET_LDB_STAT_BLOOM_MISS_COUNT(&ldb_cache_stat[i], LdbBloomFilterPolicy::old_stat_[i].miss_count());
            LdbBloomFilterPolicy::old_stat_[i].reset();
          }
        }
      }
    }  // namespace Tair
  }  // namespace storage
}  // namespace ldb

